Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1022][Streaming] Add Kafka real unit test #1751

Closed
wants to merge 6 commits into from

Conversation

jerryshao
Copy link
Contributor

This PR is a updated version of (#557) to actually test sending and receiving data through Kafka, and fix previous flaky issues.

@tdas, would you mind reviewing this PR? Thanks a lot.

Conflicts:
	external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
	external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
	project/SparkBuild.scala
Conflicts:
	project/SparkBuild.scala
@SparkQA
Copy link

SparkQA commented Aug 3, 2014

QA tests have started for PR 1751. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17812/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 3, 2014

QA results for PR 1751:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17812/consoleFull

f
}

def deleteDir(file: File) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can directly using spark.Utils functions deleteRecursively() and createTempDir() for this.

@tdas
Copy link
Contributor

tdas commented Aug 4, 2014

It took a look and this is good, i had a few comments.

ssc.stop()
}

private def getBrokerConfig(port: Int): Properties = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its better to move all these utility functions for generating local kafka test harness to a separate class, say KafkaTestUtils.

@jerryshao
Copy link
Contributor Author

Hi TD, thanks for your review, I will update the code according to your comments.

@SparkQA
Copy link

SparkQA commented Aug 5, 2014

QA tests have started for PR 1751. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17911/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 5, 2014

QA results for PR 1751:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17911/consoleFull

@tdas
Copy link
Contributor

tdas commented Aug 5, 2014

Thanks @jerryshao I have merged this!

asfgit pushed a commit that referenced this pull request Aug 5, 2014
This PR is a updated version of (#557) to actually test sending and receiving data through Kafka, and fix previous flaky issues.

@tdas, would you mind reviewing this PR? Thanks a lot.

Author: jerryshao <[email protected]>

Closes #1751 from jerryshao/kafka-unit-test and squashes the following commits:

b6a505f [jerryshao] code refactor according to comments
5222330 [jerryshao] Change JavaKafkaStreamSuite to better test it
5525f10 [jerryshao] Fix flaky issue of Kafka real unit test
4559310 [jerryshao] Minor changes for Kafka unit test
860f649 [jerryshao] Minor style changes, and tests ignored due to flakiness
796d4ca [jerryshao] Add real Kafka streaming test
@asfgit asfgit closed this in e87075d Aug 5, 2014
@markhamstra
Copy link
Contributor

@tdas @pwendell This broke the Maven build:

~/Apache/spark(branch-1.1|✔) ➤ mvn -U -DskipTests clean install
.
.
.
[error] Apache/spark/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala:36: object NIOServerCnxnFactory is not a member of package org.apache.zookeeper.server
[error] import org.apache.zookeeper.server.NIOServerCnxnFactory
[error]        ^
[error] Apache/spark/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala:199: not found: type NIOServerCnxnFactory
[error]     val factory = new NIOServerCnxnFactory()
[error]                       ^
[error] two errors found
[error] Compile failed at Aug 5, 2014 1:42:36 PM [0.503s]

@tdas
Copy link
Contributor

tdas commented Aug 5, 2014

Looking into it.

@tdas
Copy link
Contributor

tdas commented Aug 5, 2014

Reproduced it by compiling with maven locally. Since this passed Jenkins, I am pretty sure that it passes with SBT. Still compiling with SBT for sanity.

@tdas
Copy link
Contributor

tdas commented Aug 5, 2014

Okay sbt/sbt compile:test works. So this is a maven-only issue, which I am not sure how that happened. Specially since our sbt build now directly depends on the maven poms.
@pwendell Any idea!?

@srowen
Copy link
Member

srowen commented Aug 6, 2014

The problem is that the test suite uses zookeeper classes, but does not depend on the zookeeper artifact. The most direct fix is to introduce a test dependency on org.apache.zookeeper:zookeeper, I believe, which is already configured in the parent?

How does it work with SBT? Well, zookeeper is a transitive dependency of Kafka, and Kafka is a non-test dependency of this module. SBT includes it, perhaps. I know Maven, weirdly, does not include transitive dependencies of test dependencies for some reason.

In fact spark-core uses zookeeper too and doesn't express the dependency. It's also getting away with it because Curator depends on Zookeeper. That's kind of OK since Curator is a ZK utility of sorts. But not technically right, and means that the ZK version in the build is not actually controlled by its zookeeper.properties value. (It's unused.)

I'll make a PR tomorrow if nobody has patched it already. It'd be good to get both of those corrected, but the immediate issue is the test compilation issue.

@tdas
Copy link
Contributor

tdas commented Aug 6, 2014

I dont think that is the problem. The real problem is how SBT and Maven resolves multiple versions of the same library, which in this case, is Zookeeper. Observing and comparing the dependency trees from Maven and SBT showed this. Spark depends on ZK 3.4.5 whereas Apache Kafka transitively depends on upon ZK 3.3.4. SBT decides to evict 3.3.4 and use the higher version 3.4.5. But Maven decides to stick to the closest (in the tree) dependent version of 3.3.4. And 3.3.4 does not have NIOServerCnxnFactory.

The solution is probably to exclude zookeeper from the apache-kafka dependency in streaming-kafka module so that it just inherits zookeeper from Spark core.

@tdas
Copy link
Contributor

tdas commented Aug 6, 2014

Trying that right now

@srowen
Copy link
Member

srowen commented Aug 6, 2014

I think you are closer to right. The error would be different with no zookeeper dep at all. Actually spark core forgot to depend on zookeeper. If you fix that in core and here it should work. No exclude needed.

@tdas
Copy link
Contributor

tdas commented Aug 6, 2014

I am not sure adding a direct dependency on zookeeper in the core is a good idea. What if, in future, we do not depend on Curator? If no one remembers this reason of why we added Zookeeper, we will still have an unnecessary dependency of zookeeper on the core. In fact in generate, the reason kafka, flume, and other stuff were moved to external was so that they have no bearing on the dependencies in core. So if there is a solution that fixes external stuff without touching dependencies of core, then thats better.

What do you think? @pwendell

@tdas
Copy link
Contributor

tdas commented Aug 6, 2014

So excluding zookeeper from apache-kafka dependency fixed it for mvn builds. Submitted PR #1797

@jerryshao
Copy link
Contributor Author

Hi @tdas and @srowen , sorry for the incomplete test and compile, I only compiled and tested under SBT and it looked fine, so I missed the Maven part. Thanks a lot for fixing this issue :).

asfgit pushed a commit that referenced this pull request Aug 6, 2014
#1751 caused maven builds to fail.

```
~/Apache/spark(branch-1.1|✔) ➤ mvn -U -DskipTests clean install
.
.
.
[error] Apache/spark/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala:36: object NIOServerCnxnFactory is not a member of package org.apache.zookeeper.server
[error] import org.apache.zookeeper.server.NIOServerCnxnFactory
[error]        ^
[error] Apache/spark/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala:199: not found: type NIOServerCnxnFactory
[error]     val factory = new NIOServerCnxnFactory()
[error]                       ^
[error] two errors found
[error] Compile failed at Aug 5, 2014 1:42:36 PM [0.503s]
```

The problem is how SBT and Maven resolves multiple versions of the same library, which in this case, is Zookeeper. Observing and comparing the dependency trees from Maven and SBT showed this. Spark depends on ZK 3.4.5 whereas Apache Kafka transitively depends on upon ZK 3.3.4. SBT decides to evict 3.3.4 and use the higher version 3.4.5. But Maven decides to stick to the closest (in the tree) dependent version of 3.3.4. And 3.3.4 does not have NIOServerCnxnFactory.

The solution in this patch excludes zookeeper from the apache-kafka dependency in streaming-kafka module so that it just inherits zookeeper from Spark core.

Author: Tathagata Das <[email protected]>

Closes #1797 from tdas/kafka-zk-fix and squashes the following commits:

94b3931 [Tathagata Das] Fixed zookeeper dependency of Kafka
(cherry picked from commit ee7f308)

Signed-off-by: Patrick Wendell <[email protected]>
asfgit pushed a commit that referenced this pull request Aug 6, 2014
#1751 caused maven builds to fail.

```
~/Apache/spark(branch-1.1|✔) ➤ mvn -U -DskipTests clean install
.
.
.
[error] Apache/spark/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala:36: object NIOServerCnxnFactory is not a member of package org.apache.zookeeper.server
[error] import org.apache.zookeeper.server.NIOServerCnxnFactory
[error]        ^
[error] Apache/spark/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala:199: not found: type NIOServerCnxnFactory
[error]     val factory = new NIOServerCnxnFactory()
[error]                       ^
[error] two errors found
[error] Compile failed at Aug 5, 2014 1:42:36 PM [0.503s]
```

The problem is how SBT and Maven resolves multiple versions of the same library, which in this case, is Zookeeper. Observing and comparing the dependency trees from Maven and SBT showed this. Spark depends on ZK 3.4.5 whereas Apache Kafka transitively depends on upon ZK 3.3.4. SBT decides to evict 3.3.4 and use the higher version 3.4.5. But Maven decides to stick to the closest (in the tree) dependent version of 3.3.4. And 3.3.4 does not have NIOServerCnxnFactory.

The solution in this patch excludes zookeeper from the apache-kafka dependency in streaming-kafka module so that it just inherits zookeeper from Spark core.

Author: Tathagata Das <[email protected]>

Closes #1797 from tdas/kafka-zk-fix and squashes the following commits:

94b3931 [Tathagata Das] Fixed zookeeper dependency of Kafka
@srowen
Copy link
Member

srowen commented Aug 6, 2014

@tdas the problem is that the dependency is already there. Spark core uses Zookeeper classes directly in SparkCuratorUtil. If you remove Curator, the code no longer compiles. Now, granted, if you removed Curator, you'd be removing this class too. So it's kind of OK to let the transitive dependency happen in practice in Spark Core.

However, it is not pulling in the version declared in zookeeper.version (necessarily). In fact, that property is not used at all.

It exists I think for the benefit of vendors who are building the whole thing for a system that uses a particular zookeeper version, as evidenced by its presence in the MapR build. I think the intent is to control the version Curator depends on. So I think there is an intent for Core to depend directly on ZK for this reason?

I wouldn't agree that letting the transitive dependency happen to cover this is a good idea for the Kafka test though. There, it uses Zookeeper independently of Curator. It's a test dependency too so doesn't affect the non-test artifacts.

It would be more correct/robust to simply express the dependency on zookeeper in this case. I'll open a PR that shows what that looks like.

@srowen
Copy link
Member

srowen commented Aug 6, 2014

@tdas @pwendell Have a look at #1804

@tdas
Copy link
Contributor

tdas commented Aug 6, 2014

Well, #1797 seems to have fixed the maven build.

I get the logic that it might be better to explicitly depend and therefore control the zookeeper version used by Spark core. I am not entirely sure how important that is, so I will let @pwendell chime in on this.

Also moving this discussion to the new thread #1804

@srowen
Copy link
Member

srowen commented Aug 6, 2014

To me, it would be about as sensible to delete zookeeper.version as unused rather than use it to explicitly control the dependency that core brings in. No argument there. I think it's incorrect to use Zookeeper in the Kafka suite and then happen to rely on Spare Core to bring in ZK as a dependency-of-a-dependency. Why not just express it correctly? But the build is highly likely to get away with it for a long time :) I can nix the second commit and replace it with deleting zookeeper.version although still mildly stand by the first one.

xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
This PR is a updated version of (apache#557) to actually test sending and receiving data through Kafka, and fix previous flaky issues.

@tdas, would you mind reviewing this PR? Thanks a lot.

Author: jerryshao <[email protected]>

Closes apache#1751 from jerryshao/kafka-unit-test and squashes the following commits:

b6a505f [jerryshao] code refactor according to comments
5222330 [jerryshao] Change JavaKafkaStreamSuite to better test it
5525f10 [jerryshao] Fix flaky issue of Kafka real unit test
4559310 [jerryshao] Minor changes for Kafka unit test
860f649 [jerryshao] Minor style changes, and tests ignored due to flakiness
796d4ca [jerryshao] Add real Kafka streaming test
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
apache#1751 caused maven builds to fail.

```
~/Apache/spark(branch-1.1|✔) ➤ mvn -U -DskipTests clean install
.
.
.
[error] Apache/spark/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala:36: object NIOServerCnxnFactory is not a member of package org.apache.zookeeper.server
[error] import org.apache.zookeeper.server.NIOServerCnxnFactory
[error]        ^
[error] Apache/spark/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala:199: not found: type NIOServerCnxnFactory
[error]     val factory = new NIOServerCnxnFactory()
[error]                       ^
[error] two errors found
[error] Compile failed at Aug 5, 2014 1:42:36 PM [0.503s]
```

The problem is how SBT and Maven resolves multiple versions of the same library, which in this case, is Zookeeper. Observing and comparing the dependency trees from Maven and SBT showed this. Spark depends on ZK 3.4.5 whereas Apache Kafka transitively depends on upon ZK 3.3.4. SBT decides to evict 3.3.4 and use the higher version 3.4.5. But Maven decides to stick to the closest (in the tree) dependent version of 3.3.4. And 3.3.4 does not have NIOServerCnxnFactory.

The solution in this patch excludes zookeeper from the apache-kafka dependency in streaming-kafka module so that it just inherits zookeeper from Spark core.

Author: Tathagata Das <[email protected]>

Closes apache#1797 from tdas/kafka-zk-fix and squashes the following commits:

94b3931 [Tathagata Das] Fixed zookeeper dependency of Kafka
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants